Regression with kernel smoothing

Or

"Mr. Smoothie's Smoothie machine Smoother", a Story in Seven Parts

What are we doing here?

We're doing regression!

  • start with kNN
  • generalize how the points in the training data are accounted for
  • generalize the way that the regression function is calculated
  • discuss the problems

We will use simple data structures and lots of visualization to give a clear picture of what is happening in each step. $\texttt{scikit-learn}$ will be used only when the implementation gets complicated.

This is basically a reproductions of Essentials of Statistical Learning (ESL), Chapter 6, first few sections.

Background

Presume a relationship between input data X and and target data Y:

$Y = f(X) + \textrm{noise}$

The goal in regression is to calculate a function $\hat{f}(X)$ that is a good estimate of $f(X)$.

Setup


In [ ]:
import sys
import heapq

import sklearn
import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import norm

%matplotlib inline

Start with some simple examples of data distributions.

Remember: all parameter choices matter! We'll study the effects of our choices later.


In [ ]:
n_pts = 70 # number of data points
X = np.linspace(-1,1,n_pts) # build some X data

In [ ]:
# size of noise
sigma = 1  
# get the corresponding Y data for a perfectly linear relationship with Gaussian noise
Y = X + (np.random.randn(n_pts)*sigma) 
_ = plt.scatter(X,Y)

Now let's consider a more complicated relationship between the variables.


In [ ]:
def f(X):
    # cubic function of X
    return -10*X*X*X - 2*X*X

Switch to the familiar notation of training and test samples.

Note: we will generate a single array of x-values, and draw test and training sets of y-values.


In [ ]:
X_train = X
Y_train = f(X) + (np.random.randn(n_pts)*sigma)
_ = plt.scatter(X_train,Y_train)

Let's see how well oridinary linear regression does.


In [ ]:
# http://jmduke.com/posts/basic-linear-regressions-in-python/

def basic_linear_regression(x, y):
    """
    Use least-square minimization to compute the regression coefficients
    for a 1-dim linear model.
    
    parameters:
        x: array of values for the independant (feature) variable 
        y: array of values for the dependaent (target) variable
        
    return value:
        2-tuple of slope and y-intercept
    """
    
    # Basic computations to save a little time.
    length = len(x)
    sum_x = sum(x)
    sum_y = sum(y)

    # Σx^2, and Σxy respectively.
    sum_x_squared = sum(map(lambda a: a * a, x))
    sum_of_products = sum([x[i] * y[i] for i in range(length)])

    # Magic formulae!  
    a = (sum_of_products - (sum_x * sum_y) / length) / (sum_x_squared - ((sum_x ** 2) / length))
    b = (sum_y - a * sum_x) / length
    return a, b

In [ ]:
B_1,B_0 = basic_linear_regression(X_train,Y_train)

Make a plotting function that make a scatter plot of the data overlaid with the estimated regression function, $\hat{f}$.


In [ ]:
def plot(X,Y,Y_hat):
    """
    Plot data and estimated regression function
    
    Parameters:
        X: independant variable
        Y: dependant variable
        Y_hat: estimate of the dependant variable; f_hat(X)
    """
    plt.scatter(X,Y,label='data')
    plt.plot(X,Y_hat,label='estimate',color='g',linewidth=2)
    plt.legend()

In [ ]:
Y_hat_train = X_train*B_1 + B_0 

plot(X_train,Y_train,Y_hat_train)

How can we quantify the quality of the regression?


In [ ]:
def mse(Y_hat_train,Y_train,Y_test,print_results=True):
    """ 
    Print mean squared error for test and train data
    
    Parameters:
        Y_hat_train: estimated y-values for the training set
        Y_train: true y-values for the training set
        Y_test: true y-values for an independant test set, based on the _same_ x-values as Y_train. 
    
    Return value:
        tuple(training error, test error)
    
    """
    train_err = np.mean([abs(yh-y)**2 for y,yh in zip(Y_train,Y_hat_train)])
    test_err = np.mean([abs(yh-y)**2 for y,yh in zip(Y_test,Y_hat_train)])
    
    if print_results:
        print("train err: {0:.3f}".format(train_err))
        print("test err: {0:.3f}".format(test_err))
    else:
        return train_err,test_err

In [ ]:
# draw a _test_ sample from f(X)
Y_test =  f(X) + (np.random.randn(n_pts)*sigma)

In [ ]:
mse(Y_hat_train,Y_train,Y_test)

k Nearest Neighbors

Remember how kNN works:

The value of the function $\hat{f}$ is calculated at every point $x_0$ in X and is given by the average of the $y$ values for the $k$ nearest neighbors in the training data.

$\hat{f}(x)=Average(y_i |~ x_i\in N_k(x))$,

where $N_k(x)$ is the set of $k$ nearest neighbor points to $x$.


In [ ]:
def kNN(X,Y,x_0,k=20):
    """
    Simple 1-D implementation of kNN average.
    
    Parameters:
        X: the vector of feature data
        x_0: a particular point in the feature space
        k: number of nearest neighbors to include
    
    Return value:
        The estimated regression function.
    
    For our purposes, think of a heapq object as a sorted list with many nice performance properties. 
    The first item is always the smallest. For items that are tuples, the default is to sort
    by the first element in the tuple.
    """
    nearest_neighbors = []
    for x,y in zip(X,Y):
        distance = abs(x-x_0)
        heapq.heappush(nearest_neighbors,(distance,y))    
    return np.mean( [heapq.heappop(nearest_neighbors)[1] for _ in xrange(k)] )

In [ ]:
k = 15
Y_hat_train_knn = [kNN(X_train,Y_train,x_0,k=k) for x_0 in X_train]

In [ ]:
plot(X_train,Y_train,Y_hat_train_knn)

In [ ]:
mse(Y_hat_train_knn,Y_train,Y_test)

As $k\rightarrow 1$, the model exactly matches the training data, and the training error goes to zero. But the test error increases as the variance goes up.

Kernel smoothing

The function $N_k(X)$ is a kernel function. It defines how the data points contribute to the calculation of the regression function, as a function of $X$. We can think of $N_k$ as assigning weights of $0$ or $1$ to every point in the training data, as a function of $X$.

We can generalize the kNN function above to calculate the weighted average of $Y_{train}$ at $X_0$, for an arbitrary kernel.


In [ ]:
def kernel_smoother(X,Y,x_0,kernel,width):
    """
    Generalization of 1-D kNN average, with custom kernel.
    
    Parameters:
        X: the vector of feature data
        x_0: a particular point in the feature space
        kernel: kernel function
        width: kernel width
    
    Return value:
        The estimated regression function at x_0.
    """
    kernel_weights = [kernel(x_0,x,width) for x in X]
    
    weighted_average = np.average(Y,weights=kernel_weights)
    return weighted_average

In [ ]:
def epanechnikov_kernel(x_0,x,width):
    """
    For a point x_0 in x, return the weight for the given width.
    """
    def D(t):
        if t <= 1:
            #return 3/4*float(1-t*t) <== why doesn't this work?
            return float(1-t*t)*3/4
        else:
            return 0
    return D(abs(x-x_0)/width)

In [ ]:
# plot the Epanechnikov kernel at x_0 = 0, width = 1 to get a sense for it
Y = [epanechnikov_kernel(0,x,1) for x in X]
_ = plt.plot(X,Y)

Using the updateded kNN with an Epanechnikov kernel, make a better prediction function.


In [ ]:
width=0.35
Y_hat_train_epan_kernel = [kernel_smoother(X_train
                                           ,Y_train
                                           ,x_0
                                           ,kernel=epanechnikov_kernel
                                           ,width=width) 
                           for x_0 in X_train]

In [ ]:
plot(X_train,Y_train,Y_hat_train_epan_kernel)

In [ ]:
mse(Y_hat_train_epan_kernel,Y_train,Y_test)

There are other kernels.


In [ ]:
def tri_cube_kernel(x_0,x,width):
    def D(t):
        if t <= 1:
            return float(1-t*t*t)**3
        else:
            return 0
    return D(abs(x-x_0)/width)

In [ ]:
# plot some kernels at x_0 = 0, width = 1 to get a sense for them
Y1 = [epanechnikov_kernel(0,x,1) for x in X]
Y2 = [tri_cube_kernel(0,x,1) for x in X]
Y3 = [norm.pdf(x) for x in X]
plt.plot(X,Y1,label="Epanechnikov")
plt.plot(X,Y2,color='g',label="tri-cube")
plt.plot(X,Y3,color='k',label="Gaussian")
plt.legend(loc='best')

In [ ]:
Y_hat_train_tri_kernel = [kernel_smoother(X_train
                                          ,Y_train
                                          ,x_0,kernel=tri_cube_kernel
                                          ,width=width) 
                          for x_0 in X_train]

In [ ]:
plot(X_train,Y_train,Y_hat_train_tri_kernel)

In [ ]:
mse(Y_hat_train_tri_kernel,Y_train,Y_test)

Local Linear Regression

Manage the bias at the boundary by replacing the weighted average with a weighted linear fit.

For each point $x_0$ in X:

  1. use a kernel to get a set of weights for all points in the the training data
  2. do a weighted, linear regression on those points (and weights) to determine the least-square parameters: the slope ($\beta_1$) and y-intercept ($\beta_0$).
  3. calculate the estimated regression function at $x_0$: $\hat{y}(x_0) = \beta_0 + x_0 * \beta_1$

In [ ]:
from sklearn.linear_model import LinearRegression

def linear_kernel_model(X,Y,x_0,kernel,width):
    """
    1-D kernel-smoothed model with local linear regression.
    
    Parameters:
        X: the vector of feature data
        x_0: a particular point in the feature space
        kernel: kernel function
        width: kernel width
    
    Return value:
        The estimated regression function at x_0.
    """
    kernel_weights = [kernel(x_0,x,width) for x in X]
    
    # the scikit-learn functions want something more numpy-like: an array of arrays
    X = [[x] for x in X]
    
    wls_model = LinearRegression()
    wls_model.fit(X,Y,kernel_weights)

    B_0 = wls_model.intercept_
    B_1 = wls_model.coef_[0]
    
    y_hat = B_0 + B_1*x_0
    
    return y_hat

In [ ]:
Y_hat_train_linear_reg_epan_kernel = [
    linear_kernel_model(X_train
                        ,Y_train
                        ,x_0
                        ,kernel=epanechnikov_kernel
                        ,width=width) 
    for x_0 in X_train]

In [ ]:
plot(X_train,Y_train,Y_hat_train_linear_reg_epan_kernel)

In [ ]:
mse(Y_hat_train_linear_reg_epan_kernel,Y_train,Y_test)

How can we optimize the value of meta-parameters like the kernel width?

Remember, the performance of many such parameters is correlated to variables like $\texttt{n}\_\texttt{pts}$.


In [ ]:
def test_width(X_train,Y_train,Y_test,values,model=linear_kernel_model,kernel=epanechnikov_kernel):
    """
    Make a plot of the test and training mse as a function of some parameter
    """
    train_errors = []
    test_errors = []
    for width in values:
        Y_hat_train = [
            model(X_train
                ,Y_train
                ,x_0,kernel=kernel
                ,width=width
                ) 
        for x_0 in X_train]
        train_err,test_err = mse(Y_hat_train,Y_train,Y_test,print_results=False)
        train_errors.append(train_err)
        test_errors.append(test_err)
    plt.plot(values,train_errors,'g.',ms=10,label="train error")
    plt.plot(values,test_errors,'b.',ms=10,label="test error")
    plt.legend(loc='best')

In [ ]:
widths = np.linspace(0.001,1,50) # 50 evenly-spaced width values
test_width(X_train,Y_train,Y_test,widths,model=linear_kernel_model,kernel=epanechnikov_kernel)

In [ ]:
width=0.2

Higher-dimensional data

Make some functions of multi-dimensional input data.


In [ ]:
def f_2D(X):
    return [(2*x[1]**3) + x[1]*x[0]*4 for x in X]
def f_3D(X):
    return [(-2*x[0]*x[0]) + (2*x[1]**3) + x[2]*x[1]*x[0]*4 for x in X]
def f_nD_poly(X,n=2):
    """
    Build a function that goes like x^n on each feature dimension x in X
    """
    return [ sum([x**n for x in dim]) for dim in X]

Generate random data in 2 dimensions.


In [ ]:
import random
n_pts = 50
X_train = np.array([[random.random(),random.random()] for _ in range(n_pts)])

In [ ]:
sigma = 0.1
Y_train = f_nD_poly(X_train,2) + ( np.random.randn(n_pts) * sigma )
Y_test = f_nD_poly(X_train,2) + ( np.random.randn(n_pts) * sigma )

Test first with a multi-dimensional kNN.


In [ ]:
def kNN_multiD(X,Y,x_0,k=20,kernel_pars=None):
    """
    Simple multi-dimensional implementation of kNN average.
    
    Parameters:
        X: the vector of feature data
        x_0: a particular point in the feature space
        k: number of nearest neighbors to include
    
    Return value:
        The estimated regression function at x_0.

    Note: use numpy.linalg.norm for N-dim norms.
    """
    nearest_neighbors = []
    for x,y in zip(X,Y):
        distance = np.linalg.norm(np.array(x)-np.array(x_0))
        heapq.heappush(nearest_neighbors,(distance,y))    
    return np.mean( [heapq.heappop(nearest_neighbors)[1] for _ in xrange(k)] )

Remember that, because the dimensionality of the features is no long one, the scale of the error will be different. So don't compare the numbers below with those from the 1-D examples above.


In [ ]:
Y_hat_train = [
    kNN_multiD(X_train,Y_train,x_0,k=k) 
    for x_0 in X_train]
mse(Y_hat_train,Y_train,Y_test)

Multi-dimensional versions of kernel and model.


In [ ]:
def epanechnikov_kernel_multiD(x_0,x,width=1):
    def D(t):
        #print("width = {}".format(width))
        if t <= 1:
            return float(1-t*t)*3/4
        else:
            return 0
    return D(np.linalg.norm(np.array(x)-np.array(x_0))/width)

Let's also generalize the model so that the regression need not be simple and linear.


In [ ]:
def generalized_kernel_model(X,Y,x_0,kernel=epanechnikov_kernel_multiD,kernel_pars={},regressor=LinearRegression):
    """
    Multi-D kernel-smoothed model with local generalized regression.
    
    Parameters:
        X: the vector of feature data
        x_0: a particular point in the feature space
        kernel: kernel function
        width: kernel width
        regressor: regression class - must follow scikit-learn API
    
    Return value:
        The estimated regression function at x_0.
    """
    kernel_weights = [kernel(x_0,x,**kernel_pars) for x in X]
    model = regressor()
    model.fit(X,Y,sample_weight=kernel_weights)
  
    return model.predict([x_0])[0]

In [ ]:
from sklearn.linear_model import Ridge,Lasso,ElasticNet
regressor=LinearRegression

width = 0.5
Y_hat_train = [
    generalized_kernel_model(X_train
                             ,Y_train
                             ,x_0
                             ,kernel=epanechnikov_kernel_multiD
                             ,kernel_pars={"width":width}
                             ,regressor=regressor) 
    for x_0 in X_train]

Compare the MSE here to that of the kNN above.


In [ ]:
mse(Y_hat_train,Y_train,Y_test)

Experiments

Build an API that allows you to optimize any parameter by visualizing the test and training errors.


In [ ]:
def test_parameter(parameter_name,values,args):
    """
    Make a plot of the test and training mse as a function of some parameter
    
    Parameters:
        parameter name:
        values: 
        args:
    
    """
    train_errors = []
    test_errors = []
    
    # get the dictionary element and shortened name for the variable parameter
    par_name = ""
    
    X_train = np.array([[random.random() for _ in range(args['main']['n_dim'])] for _ in range(args['main']['n_pts'])])
    Y_train = args['main']['func'](X_train) + ( np.random.randn(args['main']['n_pts']) * args['main']['sigma'] )
    Y_test = args['main']['func'](X_train) + ( np.random.randn(args['main']['n_pts']) * args['main']['sigma'] )
    
    for value in values:
        # set the value of the variable parameter for this iteration
        location = args
        for key_num,key in enumerate(parameter_name.split(':')):
            par_name = key
            if key_num+1 < len(parameter_name.split(':')): 
                location = location[key]
            else:
                location[key] = value  
        
        Y_hat_train = [
            args['main']['model_name'](X_train
                ,Y_train
                ,x_0
                ,kernel=args['main']['kernel']
                ,**args['model']
                ) 
        for x_0 in X_train]
                
        train_err,test_err = mse(Y_hat_train,Y_train,Y_test,print_results=False)
        train_errors.append(train_err)
        test_errors.append(test_err)
        
    plt.plot(values,train_errors,'g.',ms=15,label="train error")
    plt.plot(values,test_errors,'b.',ms=15,label="test error")
    plt.title(par_name)
    plt.legend(loc='best')

In [ ]:
arguments = {
    "main":{
        "func":f_nD_poly,
        "model_name":generalized_kernel_model,
        "kernel":epanechnikov_kernel_multiD,
        "n_pts":30,
        "sigma":0.1,
        "n_dim":2
    },
    "model":{
        "regressor":LinearRegression,
        "kernel_pars":{
            "width":0.2
        }
    }
}
test_parameter("model:kernel_pars:width",np.linspace(0.01,1.5,30),arguments)

In [ ]: